package wal

import (
	"bufio"
	"context"
	"encoding/binary"
	"fmt"
	"io"
	"math"
	"os"
	"path/filepath"
	"runtime"
	"sort"
	"strconv"
	"strings"
	"sync"
	"sync/atomic"
	"time"

	"github.com/golang/snappy"
	"github.com/prometheus/client_golang/prometheus"
	"go.uber.org/zap"

	"github.com/influxdata/influxdb/v2"
	"github.com/influxdata/influxdb/v2/kit/tracing"
	"github.com/influxdata/influxdb/v2/pkg/limiter"
	"github.com/influxdata/influxdb/v2/pkg/pool"
	"github.com/influxdata/influxdb/v2/tsdb/value"
)

const (
	// DefaultSegmentSize of 10MB is the size at which segment files will be rolled over.
	DefaultSegmentSize = 10 * 1024 * 1024

	// WALFileExtension is the file extension we expect for wal segments.
	WALFileExtension = "wal"

	// WALFilePrefix is the prefix on all wal segment files.
	WALFilePrefix = "_"

	// walEncodeBufSize is the size of the wal entry encoding buffer
	walEncodeBufSize = 4 * 1024 * 1024

	float64EntryType  = 1
	integerEntryType  = 2
	booleanEntryType  = 3
	stringEntryType   = 4
	unsignedEntryType = 5
)

// WalEntryType is a byte written to a wal segment file that indicates what the following compressed block contains.
type WalEntryType byte

const (
	// WriteWALEntryType indicates a write entry.
	WriteWALEntryType WalEntryType = 0x01

	// DeleteWALEntryType indicates a delete entry. Deprecated.
	_ WalEntryType = 0x02

	// DeleteRangeWALEntryType indicates a delete range entry. Deprecated.
	_ WalEntryType = 0x03

	// DeleteBucketRangeWALEntryType indicates a delete bucket range entry.
	DeleteBucketRangeWALEntryType WalEntryType = 0x04

	// DeleteBucketRangeKeepSeriesWALEntryType indicates a delete bucket range entry
	// but the underlying series are not deleted.
	DeleteBucketRangeKeepSeriesWALEntryType WalEntryType = 0x05
)

var (
	// ErrWALClosed is returned when attempting to write to a closed WAL file.
	ErrWALClosed = fmt.Errorf("WAL closed")

	// ErrWALCorrupt is returned when reading a corrupt WAL entry.
	ErrWALCorrupt = fmt.Errorf("corrupted WAL entry")

	defaultWaitingWALWrites = runtime.GOMAXPROCS(0) * 2

	// bytePool is a shared bytes pool buffer re-cycle []byte slices to reduce allocations.
	bytesPool = pool.NewLimitedBytes(256, walEncodeBufSize*2)
)

// WAL represents the write-ahead log used for writing TSM files.
type WAL struct {
	// goroutines waiting for the next fsync
	syncCount   uint64
	syncWaiters chan chan error

	mu            sync.RWMutex
	lastWriteTime time.Time

	path    string
	enabled bool

	// write variables
	currentSegmentID     int
	currentSegmentWriter *WALSegmentWriter

	// cache and flush variables
	once    sync.Once
	closing chan struct{}

	// syncDelay sets the duration to wait before fsyncing writes.  A value of 0 (default)
	// will cause every write to be fsync'd.  This must be set before the WAL
	// is opened if a non-default value is required.
	syncDelay time.Duration

	// WALOutput is the writer used by the logger.
	logger *zap.Logger // Logger to be used for important messages

	// SegmentSize is the file size at which a segment file will be rotated
	SegmentSize int

	tracker             *walTracker
	defaultMetricLabels prometheus.Labels // N.B this must not be mutated after Open is called.

	limiter limiter.Fixed
}

// NewWAL initializes a new WAL at the given directory.
func NewWAL(path string) *WAL {
	logger := zap.NewNop()
	return &WAL{
		path:    path,
		enabled: true,

		// these options should be overridden by any options in the config
		SegmentSize: DefaultSegmentSize,
		closing:     make(chan struct{}),
		syncWaiters: make(chan chan error, 1024),
		limiter:     limiter.NewFixed(defaultWaitingWALWrites),
		logger:      logger,
	}
}

// WithFsyncDelay sets the fsync delay and should be called before the WAL is opened.
func (l *WAL) WithFsyncDelay(delay time.Duration) {
	l.syncDelay = delay
}

// SetEnabled sets if the WAL is enabled and should be called before the WAL is opened.
func (l *WAL) SetEnabled(enabled bool) {
	l.enabled = enabled
}

// WithLogger sets the WAL's logger.
func (l *WAL) WithLogger(log *zap.Logger) {
	l.logger = log.With(zap.String("service", "wal"))
}

// SetDefaultMetricLabels sets the default labels for metrics on the engine.
// It must be called before the Engine is opened.
func (l *WAL) SetDefaultMetricLabels(labels prometheus.Labels) {
	l.defaultMetricLabels = make(prometheus.Labels, len(labels))
	for k, v := range labels {
		l.defaultMetricLabels[k] = v
	}
}

// Path returns the directory the log was initialized with.
func (l *WAL) Path() string {
	l.mu.RLock()
	defer l.mu.RUnlock()
	return l.path
}

// Open opens and initializes the Log. Open can recover from previous unclosed shutdowns.
func (l *WAL) Open(ctx context.Context) error {
	l.mu.Lock()
	defer l.mu.Unlock()

	if !l.enabled {
		return nil
	}

	span, _ := tracing.StartSpanFromContext(ctx)
	defer span.Finish()

	span.LogKV("segment_size", l.SegmentSize,
		"path", l.path)

	// Initialise metrics for trackers.
	mmu.Lock()
	if wms == nil {
		wms = newWALMetrics(l.defaultMetricLabels)
	}
	mmu.Unlock()

	// Set the shared metrics for the tracker
	l.tracker = newWALTracker(wms, l.defaultMetricLabels)

	if err := os.MkdirAll(l.path, 0777); err != nil {
		return err
	}

	segments, err := SegmentFileNames(l.path)
	if err != nil {
		return err
	}
	l.tracker.SetSegments(uint64(len(segments)))

	if len(segments) > 0 {
		lastSegment := segments[len(segments)-1]
		id, err := idFromFileName(lastSegment)
		if err != nil {
			return err
		}

		l.currentSegmentID = id
		stat, err := os.Stat(lastSegment)
		if err != nil {
			return err
		}

		if stat.Size() == 0 {
			os.Remove(lastSegment)
			segments = segments[:len(segments)-1]
			l.tracker.DecSegments()
		} else {
			fd, err := os.OpenFile(lastSegment, os.O_RDWR, 0666)
			if err != nil {
				return err
			}
			if _, err := fd.Seek(0, io.SeekEnd); err != nil {
				return err
			}
			l.currentSegmentWriter = NewWALSegmentWriter(fd)

			// Reset the current segment size stat
			l.tracker.SetCurrentSegmentSize(uint64(stat.Size()))
		}
	}

	var totalOldDiskSize int64
	for _, seg := range segments {
		stat, err := os.Stat(seg)
		if err != nil {
			return err
		}

		if stat.Size() > 0 {
			totalOldDiskSize += stat.Size()
			if stat.ModTime().After(l.lastWriteTime) {
				l.lastWriteTime = stat.ModTime().UTC()
			}
		}
	}
	l.tracker.SetOldSegmentSize(uint64(totalOldDiskSize))

	l.closing = make(chan struct{})

	return nil
}

// scheduleSync will schedule an fsync to the current wal segment and notify any
// waiting gorutines.  If an fsync is already scheduled, subsequent calls will
// not schedule a new fsync and will be handle by the existing scheduled fsync.
func (l *WAL) scheduleSync() {
	// If we're not the first to sync, then another goroutine is fsyncing the wal for us.
	if !atomic.CompareAndSwapUint64(&l.syncCount, 0, 1) {
		return
	}

	// Fsync the wal and notify all pending waiters
	go func() {
		var timerCh <-chan time.Time

		// time.NewTicker requires a > 0 delay, since 0 indicates no delay, use a closed
		// channel which will always be ready to read from.
		if l.syncDelay == 0 {
			// Create a RW chan and close it
			timerChrw := make(chan time.Time)
			close(timerChrw)
			// Convert it to a read-only
			timerCh = timerChrw
		} else {
			t := time.NewTicker(l.syncDelay)
			defer t.Stop()
			timerCh = t.C
		}
		for {
			select {
			case <-timerCh:
				l.mu.Lock()
				if len(l.syncWaiters) == 0 {
					atomic.StoreUint64(&l.syncCount, 0)
					l.mu.Unlock()
					return
				}

				l.sync()
				l.mu.Unlock()
			case <-l.closing:
				atomic.StoreUint64(&l.syncCount, 0)
				return
			}
		}
	}()
}

// sync fsyncs the current wal segments and notifies any waiters.  Callers must ensure
// a write lock on the WAL is obtained before calling sync.
func (l *WAL) sync() {
	err := l.currentSegmentWriter.sync()
	for len(l.syncWaiters) > 0 {
		errC := <-l.syncWaiters
		errC <- err
	}
}

// WriteMulti writes the given values to the WAL. It returns the WAL segment ID to
// which the points were written. If an error is returned the segment ID should
// be ignored. If the WAL is disabled, -1 and nil is returned.
func (l *WAL) WriteMulti(ctx context.Context, values map[string][]value.Value) (int, error) {
	span, _ := tracing.StartSpanFromContext(ctx)
	defer span.Finish()

	if !l.enabled {
		return -1, nil
	}

	entry := &WriteWALEntry{
		Values: values,
	}

	id, err := l.writeToLog(entry)
	if err != nil {
		l.tracker.IncWritesErr()
		return -1, err
	}
	l.tracker.IncWritesOK()

	return id, nil
}

// ClosedSegments returns a slice of the names of the closed segment files.
func (l *WAL) ClosedSegments() ([]string, error) {
	if !l.enabled {
		return nil, nil
	}

	l.mu.RLock()
	defer l.mu.RUnlock()

	// Not loading files from disk so nothing to do
	if l.path == "" {
		return nil, nil
	}

	var currentFile string
	if l.currentSegmentWriter != nil {
		currentFile = l.currentSegmentWriter.path()
	}

	files, err := SegmentFileNames(l.path)
	if err != nil {
		return nil, err
	}

	var closedFiles []string
	for _, fn := range files {
		// Skip the current path
		if fn == currentFile {
			continue
		}

		closedFiles = append(closedFiles, fn)
	}

	return closedFiles, nil
}

// Remove deletes the given segment file paths from disk and cleans up any associated objects.
func (l *WAL) Remove(ctx context.Context, files []string) error {
	if !l.enabled {
		return nil
	}

	span, _ := tracing.StartSpanFromContext(ctx)
	defer span.Finish()

	l.mu.Lock()
	defer l.mu.Unlock()

	for i, fn := range files {
		span.LogKV(fmt.Sprintf("path-%d", i), fn)
		os.RemoveAll(fn)
	}

	// Refresh the on-disk size stats
	segments, err := SegmentFileNames(l.path)
	if err != nil {
		return err
	}
	l.tracker.SetSegments(uint64(len(segments)))

	var totalOldDiskSize int64
	for _, seg := range segments {
		stat, err := os.Stat(seg)
		if err != nil {
			return err
		}

		totalOldDiskSize += stat.Size()
	}
	l.tracker.SetOldSegmentSize(uint64(totalOldDiskSize))
	return nil
}

// LastWriteTime is the last time anything was written to the WAL.
func (l *WAL) LastWriteTime() time.Time {
	l.mu.RLock()
	defer l.mu.RUnlock()
	return l.lastWriteTime
}

// DiskSizeBytes returns the on-disk size of the WAL.
func (l *WAL) DiskSizeBytes() int64 {
	return int64(l.tracker.OldSegmentSize() + l.tracker.CurrentSegmentSize())
}

func (l *WAL) writeToLog(entry WALEntry) (int, error) {
	// limit how many concurrent encodings can be in flight.  Since we can only
	// write one at a time to disk, a slow disk can cause the allocations below
	// to increase quickly.  If we're backed up, wait until others have completed.
	bytes := bytesPool.Get(entry.MarshalSize())

	b, err := entry.Encode(bytes)
	if err != nil {
		bytesPool.Put(bytes)
		return -1, err
	}

	encBuf := bytesPool.Get(snappy.MaxEncodedLen(len(b)))

	compressed := snappy.Encode(encBuf, b)
	bytesPool.Put(bytes)

	syncErr := make(chan error)

	segID, err := func() (int, error) {
		l.mu.Lock()
		defer l.mu.Unlock()

		// Make sure the log has not been closed
		select {
		case <-l.closing:
			return -1, ErrWALClosed
		default:
		}

		// roll the segment file if needed
		if err := l.rollSegment(); err != nil {
			return -1, fmt.Errorf("error rolling WAL segment: %v", err)
		}

		// write and sync
		if err := l.currentSegmentWriter.Write(entry.Type(), compressed); err != nil {
			return -1, fmt.Errorf("error writing WAL entry: %v", err)
		}

		select {
		case l.syncWaiters <- syncErr:
		default:
			return -1, fmt.Errorf("error syncing wal")
		}
		l.scheduleSync()

		// Update stats for current segment size
		l.tracker.SetCurrentSegmentSize(uint64(l.currentSegmentWriter.size))
		l.lastWriteTime = time.Now().UTC()

		return l.currentSegmentID, nil

	}()

	bytesPool.Put(encBuf)

	if err != nil {
		return segID, err
	}

	// schedule an fsync and wait for it to complete
	return segID, <-syncErr
}

// rollSegment checks if the current segment is due to roll over to a new segment;
// and if so, opens a new segment file for future writes.
func (l *WAL) rollSegment() error {
	if l.currentSegmentWriter == nil || l.currentSegmentWriter.size > DefaultSegmentSize {
		if err := l.newSegmentFile(); err != nil {
			// A drop database or RP call could trigger this error if writes were in-flight
			// when the drop statement executes.
			return fmt.Errorf("error opening new segment file for wal (2): %v", err)
		}
		return nil
	}

	return nil
}

// CloseSegment closes the current segment if it is non-empty and opens a new one.
func (l *WAL) CloseSegment() error {
	if !l.enabled {
		return nil
	}

	l.mu.Lock()
	defer l.mu.Unlock()

	if l.currentSegmentWriter == nil || l.currentSegmentWriter.size > 0 {
		if err := l.newSegmentFile(); err != nil {
			// A drop database or RP call could trigger this error if writes were in-flight
			// when the drop statement executes.
			return fmt.Errorf("error opening new segment file for wal (1): %v", err)
		}
		return nil
	}
	return nil
}

// DeleteBucketRange deletes the data inside of the bucket between the two times, returning
// the segment ID for the operation.
func (l *WAL) DeleteBucketRange(orgID, bucketID influxdb.ID, min, max int64, pred []byte) (int, error) {
	if !l.enabled {
		return -1, nil
	}

	entry := &DeleteBucketRangeWALEntry{
		OrgID:     orgID,
		BucketID:  bucketID,
		Min:       min,
		Max:       max,
		Predicate: pred,
	}

	id, err := l.writeToLog(entry)
	if err != nil {
		return -1, err
	}
	return id, nil
}

// Close will finish any flush that is currently in progress and close file handles.
func (l *WAL) Close() error {
	l.mu.Lock()
	defer l.mu.Unlock()

	if !l.enabled {
		return nil
	}

	l.once.Do(func() {
		span, _ := tracing.StartSpanFromContextWithOperationName(context.Background(), "WAL.Close once.Do")
		defer span.Finish()

		span.LogKV("path", l.path)

		// Close, but don't set to nil so future goroutines can still be signaled
		close(l.closing)

		if l.currentSegmentWriter != nil {
			l.sync()
			l.currentSegmentWriter.close()
			l.currentSegmentWriter = nil
		}
	})

	return nil
}

// SegmentFileNames will return all files that are WAL segment files in sorted order by ascending ID.
func SegmentFileNames(dir string) ([]string, error) {
	names, err := filepath.Glob(filepath.Join(dir, fmt.Sprintf("%s*.%s", WALFilePrefix, WALFileExtension)))
	if err != nil {
		return nil, err
	}
	sort.Strings(names)
	return names, nil
}

// newSegmentFile will close the current segment file and open a new one, updating bookkeeping info on the log.
func (l *WAL) newSegmentFile() error {
	l.currentSegmentID++
	if l.currentSegmentWriter != nil {
		l.sync()

		if err := l.currentSegmentWriter.close(); err != nil {
			return err
		}
		l.tracker.SetOldSegmentSize(uint64(l.currentSegmentWriter.size))
	}

	fileName := filepath.Join(l.path, fmt.Sprintf("%s%05d.%s", WALFilePrefix, l.currentSegmentID, WALFileExtension))
	fd, err := os.OpenFile(fileName, os.O_CREATE|os.O_RDWR, 0666)
	if err != nil {
		return err
	}
	l.currentSegmentWriter = NewWALSegmentWriter(fd)
	l.tracker.IncSegments()

	// Reset the current segment size stat
	l.tracker.SetCurrentSegmentSize(0)
	return nil
}

// walTracker tracks writes to the WAL.
//
// As well as being responsible for providing atomic reads and writes to the
// statistics, walTracker also mirrors any changes to the external prometheus
// metrics, which the Engine exposes.
//
// *NOTE* - walTracker fields should not be directory modified. Doing so
// could result in the Engine exposing inaccurate metrics.
type walTracker struct {
	metrics         *walMetrics
	labels          prometheus.Labels
	oldSegmentBytes uint64
}

func newWALTracker(metrics *walMetrics, defaultLabels prometheus.Labels) *walTracker {
	return &walTracker{metrics: metrics, labels: defaultLabels}
}

// Labels returns a copy of the default labels used by the tracker's metrics.
// The returned map is safe for modification.
func (t *walTracker) Labels() prometheus.Labels {
	labels := make(prometheus.Labels, len(t.labels))
	for k, v := range t.labels {
		labels[k] = v
	}
	return labels
}

// IncWrites increments the number of writes to the cache, with a required status.
func (t *walTracker) IncWrites(status string) {
	labels := t.Labels()
	labels["status"] = status
	t.metrics.Writes.With(labels).Inc()
}

// IncWritesOK increments the number of successful writes.
func (t *walTracker) IncWritesOK() { t.IncWrites("ok") }

// IncWritesError increments the number of writes that encountered an error.
func (t *walTracker) IncWritesErr() { t.IncWrites("error") }

// SetOldSegmentSize sets the size of all old segments on disk.
func (t *walTracker) SetOldSegmentSize(sz uint64) {
	atomic.StoreUint64(&t.oldSegmentBytes, sz)

	labels := t.labels
	t.metrics.OldSegmentBytes.With(labels).Set(float64(sz))
}

// OldSegmentSize returns the on-disk size of all old segments.
func (t *walTracker) OldSegmentSize() uint64 { return atomic.LoadUint64(&t.oldSegmentBytes) }

// SetCurrentSegmentSize sets the size of all old segments on disk.
func (t *walTracker) SetCurrentSegmentSize(sz uint64) {
	atomic.StoreUint64(&t.oldSegmentBytes, sz)

	labels := t.labels
	t.metrics.CurrentSegmentBytes.With(labels).Set(float64(sz))
}

// CurrentSegmentSize returns the on-disk size of all old segments.
func (t *walTracker) CurrentSegmentSize() uint64 { return atomic.LoadUint64(&t.oldSegmentBytes) }

// SetSegments sets the number of segments files on disk.
func (t *walTracker) SetSegments(sz uint64) {
	labels := t.labels
	t.metrics.Segments.With(labels).Set(float64(sz))
}

// IncSegments increases the number of segments files by one.
func (t *walTracker) IncSegments() {
	labels := t.labels
	t.metrics.Segments.With(labels).Inc()
}

// DecSegments decreases the number of segments files by one.
func (t *walTracker) DecSegments() {
	labels := t.labels
	t.metrics.Segments.With(labels).Dec()
}

// WALEntry is record stored in each WAL segment.  Each entry has a type
// and an opaque, type dependent byte slice data attribute.
type WALEntry interface {
	Type() WalEntryType
	Encode(dst []byte) ([]byte, error)
	MarshalBinary() ([]byte, error)
	UnmarshalBinary(b []byte) error
	MarshalSize() int
}

// WriteWALEntry represents a write of points.
type WriteWALEntry struct {
	Values map[string][]value.Value
	sz     int
}

// MarshalSize returns the number of bytes the entry takes when marshaled.
func (w *WriteWALEntry) MarshalSize() int {
	if w.sz > 0 || len(w.Values) == 0 {
		return w.sz
	}

	encLen := 7 * len(w.Values) // Type (1), Key Length (2), and Count (4) for each key

	// determine required length
	for k, v := range w.Values {
		encLen += len(k)
		if len(v) == 0 {
			return 0
		}

		encLen += 8 * len(v) // timestamps (8)

		switch v[0].(type) {
		case value.FloatValue, value.IntegerValue, value.UnsignedValue:
			encLen += 8 * len(v)
		case value.BooleanValue:
			encLen += 1 * len(v)
		case value.StringValue:
			for _, vv := range v {
				str, ok := vv.(value.StringValue)
				if !ok {
					return 0
				}
				encLen += 4 + len(str.RawValue())
			}
		default:
			return 0
		}
	}

	w.sz = encLen

	return w.sz
}

// Encode converts the WriteWALEntry into a byte stream using dst if it
// is large enough.  If dst is too small, the slice will be grown to fit the
// encoded entry.
func (w *WriteWALEntry) Encode(dst []byte) ([]byte, error) {
	// The entries values are encode as follows:
	//
	// For each key and slice of values, first a 1 byte type for the []Values
	// slice is written.  Following the type, the length and key bytes are written.
	// Following the key, a 4 byte count followed by each value as a 8 byte time
	// and N byte value.  The value is dependent on the type being encoded.  float64,
	// int64, use 8 bytes, boolean uses 1 byte, and string is similar to the key encoding,
	// except that string values have a 4-byte length, and keys only use 2 bytes.
	//
	// This structure is then repeated for each key an value slices.
	//
	// ┌────────────────────────────────────────────────────────────────────┐
	// │                           WriteWALEntry                            │
	// ├──────┬─────────┬────────┬───────┬─────────┬─────────┬───┬──────┬───┤
	// │ Type │ Key Len │   Key  │ Count │  Time   │  Value  │...│ Type │...│
	// │1 byte│ 2 bytes │ N bytes│4 bytes│ 8 bytes │ N bytes │   │1 byte│   │
	// └──────┴─────────┴────────┴───────┴─────────┴─────────┴───┴──────┴───┘

	encLen := w.MarshalSize() // Type (1), Key Length (2), and Count (4) for each key

	// allocate or re-slice to correct size
	if len(dst) < encLen {
		dst = make([]byte, encLen)
	} else {
		dst = dst[:encLen]
	}

	// Finally, encode the entry
	var n int
	var curType byte

	for k, v := range w.Values {
		switch v[0].(type) {
		case value.FloatValue:
			curType = float64EntryType
		case value.IntegerValue:
			curType = integerEntryType
		case value.UnsignedValue:
			curType = unsignedEntryType
		case value.BooleanValue:
			curType = booleanEntryType
		case value.StringValue:
			curType = stringEntryType
		default:
			return nil, fmt.Errorf("unsupported value type: %T", v[0])
		}
		dst[n] = curType
		n++

		binary.BigEndian.PutUint16(dst[n:n+2], uint16(len(k)))
		n += 2
		n += copy(dst[n:], k)

		binary.BigEndian.PutUint32(dst[n:n+4], uint32(len(v)))
		n += 4

		for _, vv := range v {
			binary.BigEndian.PutUint64(dst[n:n+8], uint64(vv.UnixNano()))
			n += 8

			switch vv := vv.(type) {
			case value.FloatValue:
				if curType != float64EntryType {
					return nil, fmt.Errorf("incorrect value found in %T slice: %T", v[0].Value(), vv)
				}
				binary.BigEndian.PutUint64(dst[n:n+8], math.Float64bits(vv.RawValue()))
				n += 8
			case value.IntegerValue:
				if curType != integerEntryType {
					return nil, fmt.Errorf("incorrect value found in %T slice: %T", v[0].Value(), vv)
				}
				binary.BigEndian.PutUint64(dst[n:n+8], uint64(vv.RawValue()))
				n += 8
			case value.UnsignedValue:
				if curType != unsignedEntryType {
					return nil, fmt.Errorf("incorrect value found in %T slice: %T", v[0].Value(), vv)
				}
				binary.BigEndian.PutUint64(dst[n:n+8], uint64(vv.RawValue()))
				n += 8
			case value.BooleanValue:
				if curType != booleanEntryType {
					return nil, fmt.Errorf("incorrect value found in %T slice: %T", v[0].Value(), vv)
				}
				if vv.RawValue() {
					dst[n] = 1
				} else {
					dst[n] = 0
				}
				n++
			case value.StringValue:
				if curType != stringEntryType {
					return nil, fmt.Errorf("incorrect value found in %T slice: %T", v[0].Value(), vv)
				}
				binary.BigEndian.PutUint32(dst[n:n+4], uint32(len(vv.RawValue())))
				n += 4
				n += copy(dst[n:], vv.RawValue())
			default:
				return nil, fmt.Errorf("unsupported value found in %T slice: %T", v[0].Value(), vv)
			}
		}
	}

	return dst[:n], nil
}

// MarshalBinary returns a binary representation of the entry in a new byte slice.
func (w *WriteWALEntry) MarshalBinary() ([]byte, error) {
	// Temp buffer to write marshaled points into
	b := make([]byte, w.MarshalSize())
	return w.Encode(b)
}

// UnmarshalBinary deserializes the byte slice into w.
func (w *WriteWALEntry) UnmarshalBinary(b []byte) error {
	var i int
	for i < len(b) {
		typ := b[i]
		i++

		if i+2 > len(b) {
			return ErrWALCorrupt
		}

		length := int(binary.BigEndian.Uint16(b[i : i+2]))
		i += 2

		if i+length > len(b) {
			return ErrWALCorrupt
		}

		k := string(b[i : i+length])
		i += length

		if i+4 > len(b) {
			return ErrWALCorrupt
		}

		nvals := int(binary.BigEndian.Uint32(b[i : i+4]))
		i += 4

		if nvals <= 0 || nvals > len(b) {
			return ErrWALCorrupt
		}

		switch typ {
		case float64EntryType:
			if i+16*nvals > len(b) {
				return ErrWALCorrupt
			}

			values := make([]value.Value, 0, nvals)
			for j := 0; j < nvals; j++ {
				un := int64(binary.BigEndian.Uint64(b[i : i+8]))
				i += 8
				v := math.Float64frombits((binary.BigEndian.Uint64(b[i : i+8])))
				i += 8
				values = append(values, value.NewFloatValue(un, v))
			}
			w.Values[k] = values
		case integerEntryType:
			if i+16*nvals > len(b) {
				return ErrWALCorrupt
			}

			values := make([]value.Value, 0, nvals)
			for j := 0; j < nvals; j++ {
				un := int64(binary.BigEndian.Uint64(b[i : i+8]))
				i += 8
				v := int64(binary.BigEndian.Uint64(b[i : i+8]))
				i += 8
				values = append(values, value.NewIntegerValue(un, v))
			}
			w.Values[k] = values

		case unsignedEntryType:
			if i+16*nvals > len(b) {
				return ErrWALCorrupt
			}

			values := make([]value.Value, 0, nvals)
			for j := 0; j < nvals; j++ {
				un := int64(binary.BigEndian.Uint64(b[i : i+8]))
				i += 8
				v := binary.BigEndian.Uint64(b[i : i+8])
				i += 8
				values = append(values, value.NewUnsignedValue(un, v))
			}
			w.Values[k] = values

		case booleanEntryType:
			if i+9*nvals > len(b) {
				return ErrWALCorrupt
			}

			values := make([]value.Value, 0, nvals)
			for j := 0; j < nvals; j++ {
				un := int64(binary.BigEndian.Uint64(b[i : i+8]))
				i += 8

				v := b[i]
				i += 1
				if v == 1 {
					values = append(values, value.NewBooleanValue(un, true))
				} else {
					values = append(values, value.NewBooleanValue(un, false))
				}
			}
			w.Values[k] = values

		case stringEntryType:
			values := make([]value.Value, 0, nvals)
			for j := 0; j < nvals; j++ {
				if i+12 > len(b) {
					return ErrWALCorrupt
				}

				un := int64(binary.BigEndian.Uint64(b[i : i+8]))
				i += 8

				length := int(binary.BigEndian.Uint32(b[i : i+4]))
				if i+length > len(b) {
					return ErrWALCorrupt
				}

				i += 4

				if i+length > len(b) {
					return ErrWALCorrupt
				}

				v := string(b[i : i+length])
				i += length
				values = append(values, value.NewStringValue(un, v))
			}
			w.Values[k] = values

		default:
			return fmt.Errorf("unsupported value type: %#v", typ)
		}
	}
	return nil
}

// Type returns WriteWALEntryType.
func (w *WriteWALEntry) Type() WalEntryType {
	return WriteWALEntryType
}

// DeleteBucketRangeWALEntry represents the deletion of data in a bucket.
type DeleteBucketRangeWALEntry struct {
	OrgID      influxdb.ID
	BucketID   influxdb.ID
	Min, Max   int64
	Predicate  []byte
	KeepSeries bool
}

// MarshalBinary returns a binary representation of the entry in a new byte slice.
func (w *DeleteBucketRangeWALEntry) MarshalBinary() ([]byte, error) {
	b := make([]byte, w.MarshalSize())
	return w.Encode(b)
}

// UnmarshalBinary deserializes the byte slice into w.
func (w *DeleteBucketRangeWALEntry) UnmarshalBinary(b []byte) error {
	if len(b) < 2*influxdb.IDLength+16 {
		return ErrWALCorrupt
	}

	if err := w.OrgID.Decode(b[0:influxdb.IDLength]); err != nil {
		return err
	}
	if err := w.BucketID.Decode(b[influxdb.IDLength : 2*influxdb.IDLength]); err != nil {
		return err
	}
	w.Min = int64(binary.BigEndian.Uint64(b[2*influxdb.IDLength : 2*influxdb.IDLength+8]))
	w.Max = int64(binary.BigEndian.Uint64(b[2*influxdb.IDLength+8 : 2*influxdb.IDLength+16]))
	w.Predicate = b[2*influxdb.IDLength+16:]

	// Maintain backwards compatability where no predicate bytes means nil
	if len(w.Predicate) == 0 {
		w.Predicate = nil
	}

	return nil
}

// MarshalSize returns the number of bytes the entry takes when marshaled.
func (w *DeleteBucketRangeWALEntry) MarshalSize() int {
	return 2*influxdb.IDLength + 16 + len(w.Predicate)
}

// Encode converts the entry into a byte stream using b if it is large enough.
// If b is too small, a newly allocated slice is returned.
func (w *DeleteBucketRangeWALEntry) Encode(b []byte) ([]byte, error) {
	sz := w.MarshalSize()
	if len(b) < sz {
		b = make([]byte, sz)
	}

	orgID, err := w.OrgID.Encode()
	if err != nil {
		return nil, err
	}
	bucketID, err := w.BucketID.Encode()
	if err != nil {
		return nil, err
	}

	copy(b, orgID)
	copy(b[influxdb.IDLength:], bucketID)
	binary.BigEndian.PutUint64(b[2*influxdb.IDLength:], uint64(w.Min))
	binary.BigEndian.PutUint64(b[2*influxdb.IDLength+8:], uint64(w.Max))
	copy(b[2*influxdb.IDLength+16:], w.Predicate)

	return b[:sz], nil
}

// Type returns DeleteBucketRangeWALEntryType.
func (w *DeleteBucketRangeWALEntry) Type() WalEntryType {
	if w.KeepSeries {
		return DeleteBucketRangeKeepSeriesWALEntryType
	}
	return DeleteBucketRangeWALEntryType
}

// WALSegmentWriter writes WAL segments.
type WALSegmentWriter struct {
	bw   *bufio.Writer
	w    io.WriteCloser
	size int
}

// NewWALSegmentWriter returns a new WALSegmentWriter writing to w.
func NewWALSegmentWriter(w io.WriteCloser) *WALSegmentWriter {
	return &WALSegmentWriter{
		bw: bufio.NewWriterSize(w, 16*1024),
		w:  w,
	}
}

func (w *WALSegmentWriter) path() string {
	if f, ok := w.w.(*os.File); ok {
		return f.Name()
	}
	return ""
}

// Write writes entryType and the buffer containing compressed entry data.
func (w *WALSegmentWriter) Write(entryType WalEntryType, compressed []byte) error {
	var buf [5]byte
	buf[0] = byte(entryType)
	binary.BigEndian.PutUint32(buf[1:5], uint32(len(compressed)))

	if _, err := w.bw.Write(buf[:]); err != nil {
		return err
	}

	if _, err := w.bw.Write(compressed); err != nil {
		return err
	}

	w.size += len(buf) + len(compressed)

	return nil
}

// Sync flushes the file systems in-memory copy of recently written data to disk,
// if w is writing to an os.File.
func (w *WALSegmentWriter) sync() error {
	if err := w.bw.Flush(); err != nil {
		return err
	}

	if f, ok := w.w.(*os.File); ok {
		return f.Sync()
	}
	return nil
}

func (w *WALSegmentWriter) Flush() error {
	return w.bw.Flush()
}

func (w *WALSegmentWriter) close() error {
	if err := w.Flush(); err != nil {
		return err
	}
	return w.w.Close()
}

// WALSegmentReader reads WAL segments.
type WALSegmentReader struct {
	rc    io.ReadCloser
	r     *bufio.Reader
	entry WALEntry
	n     int64
	err   error
}

// NewWALSegmentReader returns a new WALSegmentReader reading from r.
func NewWALSegmentReader(r io.ReadCloser) *WALSegmentReader {
	return &WALSegmentReader{
		rc: r,
		r:  bufio.NewReader(r),
	}
}

func (r *WALSegmentReader) Reset(rc io.ReadCloser) {
	r.rc = rc
	r.r.Reset(rc)
	r.entry = nil
	r.n = 0
	r.err = nil
}

// Next indicates if there is a value to read.
func (r *WALSegmentReader) Next() bool {
	var nReadOK int

	// read the type and the length of the entry
	var lv [5]byte
	n, err := io.ReadFull(r.r, lv[:])
	if err == io.EOF {
		return false
	}

	if err != nil {
		r.err = err
		// We return true here because we want the client code to call read which
		// will return the this error to be handled.
		return true
	}
	nReadOK += n

	entryType := lv[0]
	length := binary.BigEndian.Uint32(lv[1:5])

	b := *(getBuf(int(length)))
	defer putBuf(&b)

	// read the compressed block and decompress it
	n, err = io.ReadFull(r.r, b[:length])
	if err != nil {
		r.err = err
		return true
	}
	nReadOK += n

	decLen, err := snappy.DecodedLen(b[:length])
	if err != nil {
		r.err = err
		return true
	}
	decBuf := *(getBuf(decLen))
	defer putBuf(&decBuf)

	data, err := snappy.Decode(decBuf, b[:length])
	if err != nil {
		r.err = err
		return true
	}

	// and marshal it and send it to the cache
	switch typ := WalEntryType(entryType); typ {
	case WriteWALEntryType:
		r.entry = &WriteWALEntry{
			Values: make(map[string][]value.Value),
		}
	case DeleteBucketRangeWALEntryType, DeleteBucketRangeKeepSeriesWALEntryType:
		r.entry = &DeleteBucketRangeWALEntry{
			KeepSeries: typ == DeleteBucketRangeKeepSeriesWALEntryType,
		}
	default:
		r.err = fmt.Errorf("unknown wal entry type: %v", entryType)
		return true
	}
	r.err = r.entry.UnmarshalBinary(data)
	if r.err == nil {
		// Read and decode of this entry was successful.
		r.n += int64(nReadOK)
	}

	return true
}

// Read returns the next entry in the reader.
func (r *WALSegmentReader) Read() (WALEntry, error) {
	if r.err != nil {
		return nil, r.err
	}
	return r.entry, nil
}

// Count returns the total number of bytes read successfully from the segment, as
// of the last call to Read(). The segment is guaranteed to be valid up to and
// including this number of bytes.
func (r *WALSegmentReader) Count() int64 {
	return r.n
}

// Error returns the last error encountered by the reader.
func (r *WALSegmentReader) Error() error {
	return r.err
}

// Close closes the underlying io.Reader.
func (r *WALSegmentReader) Close() error {
	if r.rc == nil {
		return nil
	}
	err := r.rc.Close()
	r.rc = nil
	return err
}

// idFromFileName parses the segment file ID from its name.
func idFromFileName(name string) (int, error) {
	parts := strings.Split(filepath.Base(name), ".")
	if len(parts) != 2 {
		return 0, fmt.Errorf("file %s has wrong name format to have an id", name)
	}

	id, err := strconv.ParseUint(parts[0][1:], 10, 32)

	return int(id), err
}
